package com.dahuan.processfunction;

import com.dahuan.bean.SensorReading;
import org.apache.flink.api.common.state.ValueState;
import org.apache.flink.api.common.state.ValueStateDescriptor;
import org.apache.flink.api.java.tuple.Tuple;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.KeyedProcessFunction;
import org.apache.flink.util.Collector;

public class Process_ApplicationCase {
    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism( 1 );

        final String host = "localhost";
        final int port = 8888;
        DataStream<String> inputStream = env.socketTextStream( host, port );

        DataStream<SensorReading> dataStream = inputStream.map( data -> {
            String[] split = data.split( "," );
            return new SensorReading( split[0], new Long( split[1] ), new Double( split[2] ) );
        } );

        dataStream.keyBy( "id" )
                .process( new TempConsIncreWarning( 10 ) )
                .print();


        env.execute( "Process_ApplicationCase" );
    }

    //TODO 自定义处理函数，检测一段时间内的温度 连续上升，输出报警
    public static class TempConsIncreWarning extends KeyedProcessFunction<Tuple, SensorReading, String> {
        //定义私有属性，统计当前时间间隔
        private Integer interval;

        public TempConsIncreWarning(Integer interval) {
            this.interval = interval;
        }

        //定义状态,保存上一次的温度值
        private ValueState<Double> lastTempState;
        private ValueState<Long> timerTsState;

        @Override
        public void open(Configuration parameters) throws Exception {
            lastTempState = getRuntimeContext().getState( new ValueStateDescriptor<Double>( "last-temp", Double.class, Double.MAX_VALUE ) );
            timerTsState = getRuntimeContext().getState( new ValueStateDescriptor<Long>( "timer-ts", Long.class ) );
        }

        @Override
        public void processElement(SensorReading value, Context ctx, Collector<String> out) throws Exception {
            //取出状态
            Double lastTemp = lastTempState.value();
            Long timerTs = timerTsState.value();

            //TODO 如果温度上升并且没有定时器，注册10秒后的定时器，开始等待
            if (value.getTemperature() > lastTemp && timerTs == null) {
                //TODO 计算出定时器的时间戳 返回当前处理时间
                Long ts = ctx.timerService().currentProcessingTime() + interval * 1000L;
                //TODO 注册
                ctx.timerService().registerProcessingTimeTimer( ts );
                timerTsState.update( ts );
            } else if (value.getTemperature() < lastTemp && timerTs != null) {
                ctx.timerService().deleteProcessingTimeTimer( timerTs );
                timerTsState.clear();
            }

            //更新状态温度
            lastTempState.update( value.getTemperature() );
        }

        @Override
        public void onTimer(long timestamp, OnTimerContext ctx, Collector<String> out) throws Exception {
            //TODO 定时器触发，输出报警信息
            out.collect( "传感器" + ctx.getCurrentKey().getField( 0 ) + "温度值连续" + interval + "s上升" );
            timerTsState.clear();
        }

        //清除状态
        @Override
        public void close() throws Exception {
            lastTempState.clear();

        }
    }
}
